package com.vaye.im.websocket;

import cn.dev33.satoken.session.SaSession;
import cn.dev33.satoken.stp.StpUtil;
import cn.hutool.dfa.SensitiveUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.vaye.common.constant.CacheConstant;
import com.vaye.common.constant.ImConstant;
import com.vaye.common.enums.TopicTypeEnums;
import com.vaye.common.utils.IPUtils;
import com.vaye.common.utils.RedisUtil;
import com.vaye.common.utils.SpringContextUtils;
import com.vaye.common.utils.UserSessionUtils;
import com.vaye.im.dto.MessageDTO;
import com.vaye.im.dto.TopicMsg;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * 综合聊天处理类，包含以下情况：
 * 1、用户加入会话（targetUserId和roomId都为空）
 * 2、点对点发送消息（targetUserId不为空）
 * 3、加入聊天室（roomId不为空）
 * @author wangzhiyong
 * @module admin
 * @date 2023年07月07日 上午9:52
 */
@Slf4j
@Component
@ServerEndpoint(value = "/chat",configurator = WebSocketConfigurator.class) //添加消息编码器
public class ChatWebSocket {

    private static RedisUtil redisUtil;

    @Autowired
    public void setRedisUtil(RedisUtil redisUtil) {
        ChatWebSocket.redisUtil = redisUtil;
    }

    private StringRedisTemplate stringRedisTemplate = SpringContextUtils.getBean(StringRedisTemplate.class);

    /**
     * 建立连接
     * @author wangzhiyong
     * @date 2023/7/7 上午10:34
     * @param roomId 房间号ID
     * @param session 建立连接的session会话
     */
    @OnOpen
    public void onOpen(@PathParam("roomId") String roomId, Session session){
        log.info("websocket 新客户端连入-ChatWebSocket.onOpen,roomId={}",roomId);
        if ("undefined".equals(roomId)) {
            roomId = null;
        }
        Map<String, Object> userProperties = session.getUserProperties();
        Long userId = (Long) userProperties.get(ImConstant.LOGINID);
        String nick = ((JSONObject) StpUtil.getSessionByLoginId(userId).getDataMap().get(SaSession.USER)).getString("nick");
        //加入会话
        UserSessionUtils.add(userId,session,nick);
        //维护在线列表
        redisUtil.hset(CacheConstant.ONLINE_USERS,String.valueOf(userId),nick);
        MessageDTO messageDTO = new MessageDTO();
        messageDTO.setCode(200);
        //加入房间
        String msg = "";
        if (!StringUtils.isEmpty(roomId)) {
            messageDTO.setRoomId(roomId);
            UserSessionUtils.addRoom(roomId,userId);
            //维护房间列表
            redisUtil.sSet(String.format(CacheConstant.ONLINE_ROOM_USERS_FORMAT,roomId),String.valueOf(userId));
            msg = "欢迎" + nick + "加入房间";
        }else {
            msg = "ok";
        }
        messageDTO.setMsg(msg);
        send(null,null,roomId,JSON.toJSONString(messageDTO),session);
    }

    /**
     * 服务端接收到信息后调用
     *
     * @param message
     * @param session
     */
    @OnMessage
    public void onMessage(@PathParam("targetUserId") String targetUserId,@PathParam("roomId") String roomId, String message, Session session) {
        if ("undefined".equals(targetUserId)) {
            targetUserId = null;
        }
        if ("undefined".equals(roomId)) {
            roomId = null;
        }
        Map<String, Object> userProperties = session.getUserProperties();
        Long loginId = (Long) userProperties.get(ImConstant.LOGINID);
        String host = (String) userProperties.get(ImConstant.HOST);
        String nick = ((JSONObject) StpUtil.getSessionByLoginId(loginId).getDataMap().get(SaSession.USER)).getString("nick");
        MessageDTO messageDTO = new MessageDTO();
        messageDTO.setCode(200);
        int type = 0;
        messageDTO.setFromUserId(loginId);
        //敏感词过滤
        if (SensitiveUtil.containsSensitive(message)) {
            message = SensitiveUtil.sensitiveFilter(message,true,null);
        }
        messageDTO.setMsg(nick + ":" + message);
        boolean flag = false;
        if (!ObjectUtils.isEmpty(targetUserId)) {    //点对点消息
            messageDTO.setTargetUserId(Long.valueOf(targetUserId));
            type = TopicTypeEnums.PEER_TO_PEER.getCode();
        } else if (!ObjectUtils.isEmpty(roomId)) {   //群消息
            messageDTO.setRoomId(roomId);
            messageDTO.setType(TopicTypeEnums.GROUP_CHAT.getCode());
            Set<Object> objects = redisUtil.sGet(String.format(CacheConstant.ONLINE_ROOM_USERS_FORMAT, roomId));
            List<Pair<Long, Session>> roomSessions = UserSessionUtils.getRoomSessions(roomId);
            if (roomSessions.size() < objects.size()) {
                flag = true;
            }
        }
        messageDTO.setType(type);
        send(loginId,ObjectUtils.isEmpty(targetUserId)?null:Long.valueOf(targetUserId),roomId,JSON.toJSONString(messageDTO),null);
        if (flag) {
            TopicMsg topicMsg = new TopicMsg();
            topicMsg.setType(TopicTypeEnums.GROUP_CHAT.getCode());
            topicMsg.setFromUserId(String.valueOf(loginId));
            topicMsg.setRoomId(roomId);
            topicMsg.setMsg(JSON.toJSONString(messageDTO));
            topicMsg.setIp(IPUtils.getLocalIP());
            String body = JSON.toJSONString(topicMsg);
            stringRedisTemplate.convertAndSend(CacheConstant.WS_CHAT_TOPIC,body);
        }
    }

    /**
     * 连接关闭时调用
     */
    @OnClose
    public void onClose(@PathParam("roomId") String roomId,Session session) {
        Map<String, Object> userProperties = session.getUserProperties();
        Long loginId = (Long) userProperties.get(ImConstant.LOGINID);
        String host = (String) userProperties.get(ImConstant.HOST);
        if (!StringUtils.isEmpty(roomId)) {
            redisUtil.setRemove(String.format(CacheConstant.ONLINE_ROOM_USERS_FORMAT,roomId),String.valueOf(loginId));
            UserSessionUtils.exitRoom(roomId,loginId);
        }
        redisUtil.hdel(CacheConstant.ONLINE_USERS,Long.toString(loginId));
    }

    /**
     * 服务端websocket出错时调用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("websocket出现错误");
        error.printStackTrace();
    }

    /**
     * 服务端发送信息给客户端
     * @param fromUserId 发送用户ID
     * @param targetUserId 用户ID
     * @param roomId 房间号
     * @param message 发送的消息
     * @param session 会话(接入连接的时候需要传会话)
     */
    public void send(Long fromUserId,Long targetUserId,String roomId, String message,Session session){
        log.info("#### 点对点消息，userId={}", targetUserId);
        if (UserSessionUtils.isEmpty()) {
            log.warn("当前无websocket连接");
            return;
        }
        try {
            if (ObjectUtils.isEmpty(roomId) && !ObjectUtils.isEmpty(session)) { //接入连接且非群聊时消息发送
                session.getBasicRemote().sendText(message);
            } else if (!ObjectUtils.isEmpty(targetUserId)) { //点对点消息
                UserSessionUtils.getSession(targetUserId).getBasicRemote().sendText(message);
            } else {    //群消息
                List<Pair<Long, Session>> roomSessions = UserSessionUtils.getRoomSessions(roomId);
                if (CollectionUtils.isEmpty(roomSessions)) {
                    log.error("room={}房间0人在线",roomId);
                }
                roomSessions.stream()
                        .forEach(e ->{
                            if (!ObjectUtils.isEmpty(fromUserId) && fromUserId.equals(e.getFirst())) {
                                return;
                            }
                            try {
                                e.getSecond().getBasicRemote().sendText(message);
                            } catch (IOException exception) {
                                exception.printStackTrace();
                            }
                        });
            }
        } catch (IOException exception) {
            log.error("消息发送消息失败，fromUserId={},targetUserId={},message={}",fromUserId,targetUserId,message);
            exception.printStackTrace();
        }
    }
}
